package com.shujia.flink.source;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
/**
 * @author SHUJIA
 */
public class Demo5MySqlSource {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //使用自定义source
        DataStreamSource<String> mysqlSource = env.addSource(new MySqlSource());

        mysqlSource.print();

        env.execute();
    }

    static class MySqlSource implements SourceFunction<String> {
        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            //使用jdbc读取mysql中的数据
            //1、加载驱动
            Class.forName("com.mysql.jdbc.Driver");
            //2、创建数据库连接
            Connection con = DriverManager.getConnection("jdbc:mysql://master:3306/student","root","123456");
            //3、编写sql，查询数据
            PreparedStatement stat = con.prepareStatement("select id,name,age,gender,clazz from student");
            //4、执行查询
            ResultSet resultSet = stat.executeQuery();
            //5、循环解析数据
            while (resultSet.next()) {
                //通过字段名获取数据
                int id = resultSet.getInt("id");
                String name = resultSet.getString("name");
                int age = resultSet.getInt("age");
                String gender = resultSet.getString("gender");
                String clazz = resultSet.getString("clazz");

                //将数据发送到下游
                ctx.collect(id + "," + name + "," + age + "," + gender + "," + clazz);
            }
        }

        @Override
        public void cancel() {
        }
    }
}
